Skip to main content

Task Queue

You can extend Piscina's functionality by implementing a custom task queue. Instead of the default FIFO (First-In-First-Out) queue, it uses a priority queue to manage tasks.

The main script (index.js) defines a PriorityTaskQueue class that wraps the shuffled-priority-queue package. This class implements the necessary methods (push, remove, shift) that Piscina expects from a task queue. The queue prioritizes tasks based on a priority value specified in the task options.

note

Task priorities only affect queued tasks. If a worker is immediately available when a task is submitted, it will be processed regardless of its priority. The priority queue comes into play when there are more tasks than available workers.

index.js
'use strict';

const spq = require('shuffled-priority-queue');
const Piscina = require('piscina');
const { resolve } = require('path');

const kItem = Symbol('item');

class PriorityTaskQueue {
queue = spq();

get size () { return this.queue.length; }

push (value) {
const queueOptions = value[Piscina.queueOptionsSymbol];
const priority = queueOptions ? (queueOptions.priority || 0) : 0;
value[kItem] = this.queue.add({ priority, value });
}

remove (value) {
this.queue.remove(value[kItem]);
}

shift () {
return this.queue.shift().value;
}
}

const pool = new Piscina({
filename: resolve(__dirname, 'worker.js'),
taskQueue: new PriorityTaskQueue(),
maxThreads: 4
});

function makeTask (task, priority) {
return { ...task, [Piscina.queueOptionsSymbol]: { priority } };
}

(async () => {
// Submit enough tasks to ensure that at least some are queued
console.log(await Promise.all([
pool.run(makeTask({ priority: 1 }, 1)),
pool.run(makeTask({ priority: 2 }, 2)),
pool.run(makeTask({ priority: 3 }, 3)),
pool.run(makeTask({ priority: 4 }, 4)),
pool.run(makeTask({ priority: 5 }, 5)),
pool.run(makeTask({ priority: 6 }, 6)),
pool.run(makeTask({ priority: 7 }, 7)),
pool.run(makeTask({ priority: 8 }, 8))
]));
})();

The worker script (worker.js) simulates some work by sleeping for 100ms, then logs and returns the priority of the task it processed. This allows us to observe the order in which tasks are executed.

worker.js
'use strict';

const { promisify } = require('util');
const sleep = promisify(setTimeout);

module.exports = async ({ priority }) => {
await sleep(100);
process._rawDebug(`${priority}`);
return priority;
};

You can also check out this example on github.